Add temporal Gemini video analysis and CloudEvents publishing pipeline#12
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Co-authored-by: groupthinking <154503486+groupthinking@users.noreply.github.com>
| **kwargs | ||
| ) | ||
|
|
||
| logger.info(f"Publishing CloudEvent: type={type}, id={event.id}") |
Check failure
Code scanning / CodeQL
Log Injection High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
General fix strategy: sanitize or normalize any user-controlled string before including it in log messages, at least by removing or replacing carriage return and newline characters, and ideally by clearly marking or restricting user input. Here, the minimal, non-breaking fix is to sanitize type / event_type prior to logging without changing the parameters passed to the rest of the system.
Best concrete fix in this codebase:
- In
CloudEventsPublisher.publish, derive a sanitized version of thetypeargument (e.g., by replacing\rand\nwith empty strings) and use that in the log message instead of the rawtype. This keeps the actual CloudEvent unchanged (it still uses the originaltypeargument) and only affects the log output. - Implement the sanitization inline in the logging call (or just above it), avoiding changes to method signatures or additional imports. The standard
str.replacemethod is sufficient and does not require new dependencies.
Specific change:
- File:
src/integration/cloudevents_publisher.py- In the
publishmethod ofCloudEventsPublisher, change line 171 from:logger.info(f"Publishing CloudEvent: type={type}, id={event.id}")
- To use a sanitized local variable, for example:
safe_type = str(type).replace("\r", "").replace("\n", "")logger.info(f"Publishing CloudEvent: type={safe_type}, id={event.id}")
- In the
- No changes are required in
advanced_video_routes.pyfor this particular log injection issue, since the problematic sink is in the publisher.
This keeps functionality intact (the event published is identical), and only the logged representation of type is sanitized to prevent log injection.
| @@ -168,7 +168,8 @@ | ||
| **kwargs | ||
| ) | ||
|
|
||
| logger.info(f"Publishing CloudEvent: type={type}, id={event.id}") | ||
| safe_type = str(type).replace("\r", "").replace("\n", "") | ||
| logger.info(f"Publishing CloudEvent: type={safe_type}, id={event.id}") | ||
|
|
||
| try: | ||
| if self.backend == "pubsub": |
| elif self.backend == "file": | ||
| return await self._publish_file(event) | ||
| else: | ||
| logger.error(f"Unsupported backend: {self.backend}") |
Check failure
Code scanning / CodeQL
Log Injection High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
In general, to fix log injection when logging user-controlled data, either (a) validate the value and restrict it to a safe, known set of options before logging, or (b) sanitize the value to remove or neutralize characters that can break log structure (such as \r and \n) or other control sequences. It is preferable to validate to a whitelist of allowed values when the domain is small and known—as is the case for backend.
For this specific case, the simplest way to fix the problem without changing existing functionality is to avoid logging the raw self.backend value when it is not one of the known backends, and instead log a sanitized or redacted representation. Since the set of valid backends is already encoded in the Literal type and in the subsequent if/elif chain, we can compute a “safe” version of the backend string that strips newline characters before logging. This keeps the informational content (“what backend was requested”) without allowing injection of extra log lines. Concretely, inside the publish method of CloudEventsPublisher, we will introduce a local variable safe_backend in the unsupported-backend branch, computed from self.backend using .replace('\r', '').replace('\n', ''), and use that in the log message instead of self.backend.
We only need to modify src/integration/cloudevents_publisher.py. No changes are required in advanced_video_routes.py because the fix is at the sink (the logging call). The change is localized to the else branch at lines 182–184 (around the logger.error(f"Unsupported backend: {self.backend}") call). No new imports or helper methods are required.
| @@ -180,7 +180,8 @@ | ||
| elif self.backend == "file": | ||
| return await self._publish_file(event) | ||
| else: | ||
| logger.error(f"Unsupported backend: {self.backend}") | ||
| safe_backend = str(self.backend).replace("\r", "").replace("\n", "") | ||
| logger.error(f"Unsupported backend: {safe_backend}") | ||
| return None | ||
| except Exception as e: | ||
| logger.error(f"Failed to publish CloudEvent: {e}", exc_info=True) |
| response.raise_for_status() | ||
|
|
||
| logger.info( | ||
| f"Published CloudEvent {event.id} to OpenWhisk trigger: {trigger_name}" |
Check failure
Code scanning / CodeQL
Log Injection High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
In general, to fix log injection, any user-provided value written to logs should be normalized so that it cannot inject new log entries or otherwise alter the log format. For plain-text logs, this typically means stripping carriage returns and newlines, and optionally other non-printable control characters, from user input before including it in log messages. The underlying functionality (publishing events) should remain unchanged; only the logging message needs to use a sanitized representation of the user input.
For this specific issue, we should ensure that trigger_name used in the log message in _publish_openwhisk is sanitized. The best minimal-change approach is:
- Keep using the original
trigger_namevalue for constructing the OpenWhisk URL and making the HTTP request (so runtime behavior is unchanged). - Introduce a sanitized version of the string that removes
\rand\n(and optionally other control characters) right before logging. - Log the sanitized value instead of the raw
trigger_name.
Concretely, in src/integration/cloudevents_publisher.py:
- Inside
_publish_openwhisk, aftertrigger_nameis computed and the HTTP call succeeds, create asafe_trigger_namevariable derived fromtrigger_namewith line breaks removed (e.g.,trigger_name.replace("\r", "").replace("\n", "")). - Change the
logger.infocall on line 284–286 to interpolatesafe_trigger_nameinstead oftrigger_name.
No additional imports or global helpers are strictly necessary for this small scope; we can do the sanitization inline.
| @@ -281,8 +281,10 @@ | ||
| ) | ||
| response.raise_for_status() | ||
|
|
||
| # Sanitize trigger name before logging to prevent log injection | ||
| safe_trigger_name = trigger_name.replace("\r", "").replace("\n", "") | ||
| logger.info( | ||
| f"Published CloudEvent {event.id} to OpenWhisk trigger: {trigger_name}" | ||
| f"Published CloudEvent {event.id} to OpenWhisk trigger: {safe_trigger_name}" | ||
| ) | ||
| return event.id | ||
| except Exception as e: |
| """ | ||
|
|
||
| import asyncio | ||
| import json |
Check notice
Code scanning / CodeQL
Unused import Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
To fix an unused import, remove the import statement for the module that is never referenced in the file. This simplifies dependencies, speeds up startup slightly, and removes the static analysis warning.
In this case, the best minimal fix is to delete the line import json from examples/complete_workflow_example.py. No other code changes are needed because nothing in the file references json. Specifically, remove line 15 while keeping the surrounding imports (asyncio, os, sys, Path) unchanged.
| @@ -12,7 +12,6 @@ | ||
| """ | ||
|
|
||
| import asyncio | ||
| import json | ||
| import os | ||
| import sys | ||
| from pathlib import Path |
|
|
||
| import asyncio | ||
| import json | ||
| import os |
Check notice
Code scanning / CodeQL
Unused import Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
In general, unused import issues are fixed by either removing the import or using the imported symbol if it was unintentionally omitted from the code. Here, the best fix that does not change existing behavior is to delete the unused import os line.
Specifically, in examples/complete_workflow_example.py, remove the line import os at line 16 and leave the remaining imports unchanged. No new methods, imports, or definitions are needed; this is a simple cleanup and does not affect runtime behavior of the script.
| @@ -13,7 +13,6 @@ | ||
|
|
||
| import asyncio | ||
| import json | ||
| import os | ||
| import sys | ||
| from pathlib import Path | ||
|
|
| try: | ||
| if isinstance(result.summary, str) and result.summary.strip().startswith("{"): | ||
| return json.loads(result.summary) | ||
| except json.JSONDecodeError: |
Check notice
Code scanning / CodeQL
Empty except Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
In general, empty except blocks should either handle the error meaningfully (e.g., logging, metrics, recovery) or narrow their scope. Here, we can keep the existing functional behavior—falling back to a simple dictionary with the raw summary—while adding logging to record that JSON parsing failed.
The best minimal fix is to replace the pass in the except json.JSONDecodeError: block around line 347 with a logging statement, similar in style to the one already used in extract_tutorial_steps. We can log a warning that includes a short message and optionally a truncated version of result.summary for context. No new imports are needed because logger is already defined at the top of the file and json is already imported.
Concretely:
- In
src/integration/temporal_video_analysis.py, in the method that returns the comparison analysis (around lines 337–350), update theexcept json.JSONDecodeError:block so that it logs a warning instead of silently passing. - Keep the existing fallback
return {"comparison": result.summary}unchanged so that behavior seen by callers does not change.
| @@ -345,7 +345,7 @@ | ||
| if isinstance(result.summary, str) and result.summary.strip().startswith("{"): | ||
| return json.loads(result.summary) | ||
| except json.JSONDecodeError: | ||
| pass | ||
| logger.warning("Could not parse comparison analysis JSON") | ||
|
|
||
| return {"comparison": result.summary} | ||
|
|
|
|
||
| import json | ||
| import os | ||
| from datetime import datetime, timezone |
Check notice
Code scanning / CodeQL
Unused import Note test
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 month ago
To fix an unused import, remove the unused symbol from the import statement (or the entire import if nothing from it is used). Here, datetime is used and timezone is not, so we should keep importing datetime while dropping timezone.
Concretely, in tests/unit/test_cloudevents_publisher.py, locate the line from datetime import datetime, timezone (line 9) and modify it to import only datetime: from datetime import datetime. No other code changes are required, since there are no references to timezone in the provided code. This removes the unnecessary dependency and resolves the CodeQL warning without affecting existing functionality.
| @@ -6,7 +6,7 @@ | ||
|
|
||
| import json | ||
| import os | ||
| from datetime import datetime, timezone | ||
| from datetime import datetime | ||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
|
|
||
| import pytest |
| result = await service.generate_content_async( | ||
| contents, | ||
| response_schema=request.schema | ||
| ) |
There was a problem hiding this comment.
Bug: The code calls service.generate_content_async(), but this method does not exist on the GeminiService object, which will cause a runtime AttributeError.
Severity: CRITICAL
Suggested Fix
Replace the call to the non-existent generate_content_async() method with a suitable existing async method from GeminiService, such as process_youtube(), process_text(), or process_video(), which supports structured output via the response_schema parameter.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/youtube_extension/backend/api/advanced_video_routes.py#L402-L405
Potential issue: The `/analyze/structured` endpoint at `advanced_video_routes.py`
attempts to call `service.generate_content_async()`. However, the `GeminiService` class
does not have a method with this name. When this endpoint is invoked, the application
will raise an `AttributeError: 'GeminiService' object has no attribute
'generate_content_async'`, causing the request to fail. The developer likely intended to
use an existing method such as `process_youtube()` which supports the `response_schema`
parameter.
Did we get this right? 👍 / 👎 to inform future reviews.
| data={ | ||
| "timestamp": evt.timestamp, | ||
| "description": evt.description, | ||
| "confidence": evt.confidence, | ||
| "metadata": evt.metadata | ||
| }, | ||
| subject=request.video_url | ||
| ) | ||
| if event_id: | ||
| published_ids.append(event_id) | ||
| await publisher.close() |
There was a problem hiding this comment.
Bug: The CloudEventsPublisher is not closed within a finally block, causing a resource leak if an exception occurs during event publishing.
Severity: HIGH
Suggested Fix
Refactor the code to use a try...finally block. Initialize the CloudEventsPublisher to None before the try block, create it inside the try, and call await publisher.close() within the finally block if the publisher object was successfully created. This ensures resource cleanup occurs even if an exception is raised.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/youtube_extension/backend/api/advanced_video_routes.py#L188-L203
Potential issue: In the `/temporal/events`, `/analyze/structured`, and `/publish-event`
endpoints, a `CloudEventsPublisher` is created within a `try` block. If an exception
occurs after the publisher is created but before `publisher.close()` is called (e.g.,
during a `publisher.publish()` network request), the `except` block is entered and the
`close()` method is never awaited. This leaks the underlying `httpx.AsyncClient`, which
can lead to resource exhaustion (file descriptors, memory) over time.
Did we get this right? 👍 / 👎 to inform future reviews.
| ) | ||
|
|
||
| # Basic auth | ||
| username, password = self.openwhisk_auth.split(":") |
There was a problem hiding this comment.
Bug: The OpenWhisk auth string is parsed using split(":"), which fails if the password contains a colon, causing the publishing feature to fail silently.
Severity: MEDIUM
Suggested Fix
Change the parsing logic from self.openwhisk_auth.split(":") to self.openwhisk_auth.split(":", 1). This will ensure the string is split only on the first colon, correctly separating the username from a password that may contain subsequent colons.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/integration/cloudevents_publisher.py#L274
Potential issue: The code for parsing the OpenWhisk authentication string in
`cloudevents_publisher.py` uses `self.openwhisk_auth.split(":")`. This will raise a
`ValueError` if the password contains a colon (a valid scenario for HTTP Basic Auth) or
if the auth string is misconfigured and contains no colon. While the exception is caught
and logged, it causes the OpenWhisk publishing feature to silently fail for users with
valid credentials that include a colon in the password.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Pull request overview
Adds “advanced video analysis” capabilities on top of the existing YouTube→Gemini pipeline: timestamp-focused (temporal) extraction, schema-constrained (structured) outputs, and CloudEvents publishing to external execution backends.
Changes:
- Introduces
TemporalVideoAnalyzerfor segment/timeline/event extraction and temporal Q&A. - Adds
CloudEventsPublisherwith Pub/Sub, HTTP webhook, OpenWhisk, and file backends. - Adds a new FastAPI router with endpoints for temporal analysis, structured analysis, and event publishing; plus accompanying tests/docs/examples.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 23 comments.
Show a summary per file
| File | Description |
|---|---|
src/integration/temporal_video_analysis.py |
New temporal prompt strategies + helpers (segments/events/timeline/etc.). |
src/integration/cloudevents_publisher.py |
New CloudEvents v1.0 model + multi-backend publisher. |
src/youtube_extension/backend/api/advanced_video_routes.py |
New API surface for temporal/structured analysis and CloudEvents publishing. |
tests/unit/test_temporal_video_analysis.py |
Unit tests for the temporal analyzer. |
tests/unit/test_cloudevents_publisher.py |
Unit tests for CloudEvents serialization + each backend. |
docs/ADVANCED_VIDEO_FEATURES.md |
Long-form docs for the new feature set. |
docs/API_QUICK_REFERENCE.md |
Copy/paste API examples for the new endpoints. |
docs/IMPLEMENTATION_SUMMARY.md |
Implementation status report and activation steps. |
examples/complete_workflow_example.py |
End-to-end example script combining temporal extraction + publishing. |
README.md |
API Reference updated to list new endpoints/docs. |
IMPLEMENTATION_COMPLETE.md |
“Implementation complete” marker doc + next steps. |
|
|
||
| video_url = "https://youtube.com/watch?v=dQw4w9WgXcQ" | ||
|
|
There was a problem hiding this comment.
This example uses the banned test video ID dQw4w9WgXcQ. Replace it with the repo’s standard test video ID (auJzb1D-fag) to avoid violating test-data conventions and automated checks.
| # YouTube URL (direct) | ||
| result = await service.analyze_video( | ||
| video_url="https://youtube.com/watch?v=dQw4w9WgXcQ", | ||
| prompt="Analyze this video", | ||
| media_resolution="high", # Use 'high' for text-heavy content | ||
| thinking_level="high" # Use 'high' for complex reasoning | ||
| ) |
There was a problem hiding this comment.
This doc uses the banned test video ID dQw4w9WgXcQ in examples. Replace it with auJzb1D-fag (the repo’s required default test video) throughout this document to stay consistent with test-data policy.
| @router.post("/publish-event") | ||
| async def publish_video_event( | ||
| source: str, | ||
| event_type: str, | ||
| data: Dict, | ||
| subject: Optional[str] = None, | ||
| backend: Optional[str] = None | ||
| ): |
There was a problem hiding this comment.
publish_video_event is documented as accepting a JSON body, but the handler signature uses plain function parameters (source, event_type, data, etc.). In FastAPI this means these values are treated as query parameters, so a JSON POST body like the docs show will not validate. Define a Pydantic request model (or use explicit Body(...)) so the endpoint matches its documented request shape.
| type="com.eventrelay.video.analyzed.structured", | ||
| data=structured_result, | ||
| subject=request.video_url, | ||
| schema=json.dumps(request.schema) |
There was a problem hiding this comment.
The schema= kwarg passed to publisher.publish(...) becomes a CloudEvents extension attribute, not the standard dataschema attribute. If the intent is to populate the CloudEvents dataschema field, pass dataschema=... instead (or rename the kwarg accordingly) so consumers see it in the standard place.
| schema=json.dumps(request.schema) | |
| dataschema=json.dumps(request.schema) |
| """ | ||
| Manually publish a video analysis event as a CloudEvent. | ||
|
|
||
| Supports multiple backends: | ||
| - pubsub: Google Cloud Pub/Sub | ||
| - http: HTTP webhook | ||
| - openwhisk: Apache OpenWhisk trigger | ||
| - file: Local file (for testing) | ||
|
|
||
| Example: |
There was a problem hiding this comment.
This adds a manual /publish-event API that can emit CloudEvents without going through the core “YouTube URL → context → agents → outputs” flow. If that’s not intended to be a public workflow, consider scoping it behind an internal flag/auth, moving it under a debug/test router, or removing it to avoid creating an alternate trigger path.
| future = self._pubsub_client.publish( | ||
| self._topic_path, | ||
| data, | ||
| **attributes | ||
| ) | ||
| message_id = future.result() | ||
| logger.info(f"Published CloudEvent {event.id} to Pub/Sub: {message_id}") |
There was a problem hiding this comment.
_publish_pubsub is async but calls future.result() directly, which blocks the event loop and can stall the API under load. Offload the blocking wait to a thread/executor (or don’t wait synchronously for the publish ack) to keep the async path non-blocking.
|
|
||
| import json | ||
| import os | ||
| from datetime import datetime, timezone |
There was a problem hiding this comment.
timezone is imported but not used in this test module. With the repo’s Ruff configuration (F401/F841 enabled for tests), this will fail lint. Remove the unused import (or use it explicitly) to keep CI green.
| from datetime import datetime, timezone | |
| from datetime import datetime |
| """ | ||
|
|
||
| import asyncio | ||
| import json |
There was a problem hiding this comment.
Import of 'json' is not used.
| import json |
|
|
||
| import asyncio | ||
| import json | ||
| import os |
There was a problem hiding this comment.
Import of 'os' is not used.
| import os |
| if isinstance(result.summary, str) and result.summary.strip().startswith("{"): | ||
| return json.loads(result.summary) | ||
| except json.JSONDecodeError: | ||
| pass |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| logger.warning("Could not parse comparison JSON from Gemini summary") |
Implements the direct YouTube URL → Gemini analysis → structured events → EventMesh/OpenWhisk execution path, including timestamped temporal extraction and JSON-structured outputs. Adds APIs and documentation to expose these capabilities end-to-end.
Ingestion & temporal understanding
Structured output
Eventing & execution
Surface area
Example (structured event extraction request):
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.